Real Time Predictive Analitics [Tweets Analysis]ΒΆ

Techniques used:
1. TF-IDF
2. NaiveBayes Classification
Use socket server to publish data from file to port 9000
# -*- coding: utf-8 -*-

import os
import sys

os.chdir("/home/cloudops/spark")
os.curdir

# Configure the environment. Set this up to the directory where
# Spark is installed
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = '/opt/spark'

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

# Add the following paths to the system path. Please check your installation
# to make sure that these zip files actually exist. The names might change
# as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.9-src.zip"))

# Initiate Spark context. Once this is done all other applications can run
from pyspark import SparkContext
from pyspark import SparkConf

# Optionally configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")

conf.setAppName("Spark-Analysis")

## Initialize SparkContext. Run only once.
# Otherwise you get multiple Context Error.
# For streaming, create a spark context with 2 threads.
sc = SparkContext('local[2]', conf=conf)
# <SparkContext master=local[2] appName=Spark-Analysis>

# =====================================
#   Building and saving the Model
# =====================================

tweetData = sc.textFile("data/movietweets.csv")
tweetData.collect()
# ['positive,The Da Vinci Code book is just awesome.',
#  'positive,i liked the Da Vinci Code a lot.',
#  'positive,i liked the Da Vinci Code a lot.',
# . . .

tweetText = tweetData.map(lambda line: line.split(",")[1])
tweetText.collect()
# ['The Da Vinci Code book is just awesome.',
# 'i liked the Da Vinci Code a lot.',
# 'i liked the Da Vinci Code a lot.',
# . . .

# =====================================
# Use TF-IDF
# =====================================
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF

hashingTF = HashingTF()
tf = hashingTF.transform(tweetText)
tf.cache()

# Transform to IDF
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)
tfidf.cache()
tfidf.count()     # 100   MapPartitionsRDD - matrix

# Concatinate tfidf with original tweetData
xformedData = tweetData.zip(tfidf)
xformedData.cache()
xformedData.collect()
# sparce vectors for sentiments ("pos/neg")
# [('positive,The Da Vinci Code book is just awesome.',
#  SparseVector(1048576, {105642: 0.2713, 151034: 0.3524, 173013: 0.9262, 173606: 0.2976,
#   186435: 0.7439, 211440: 1.0999, 238153: 0.1015,
#   244458: 3.0057, 263483: 0.3247, 265159: 0.2031,
#   296409: 0.1616, 335453: 0.8775, 469732: 1.1494,
#   702216: 0.1212, 702740: 0.0508, 734443: 0.472,
#   777769: 0.9516, 793623: 0.3111, 875351: 0.0,
#   891534: 0.0508, 897367: 0.6833, 968035: 0.0508})),
# . . .

# =====================================
from pyspark.mllib.regression import LabeledPoint

def convertToLabeledPoint(inVal) :
    origAttr = inVal[0].split(",")
    sentiment = 0.0 if origAttr[0] == "positive" else 1.0
    return LabeledPoint(sentiment, inVal[1])

# LabelPoint
tweetLp = xformedData.map(convertToLabeledPoint)
tweetLp.cache()
tweetLp.collect()
# [LabeledPoint(0.0, (1048576,[105642,151034,173013,173606,186435,211440,238153,
#  244458,263483,265159,296409,335453,469732,702216,702740,734443,777769,793623,
# 875351,891534,897367,968035],
# . . .

# =====================================
# NaiveBayes Classification
# =====================================
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel

model = NaiveBayes.train(tweetLp, 1.0)

predictionAndLabel = tweetLp.map(lambda p:
     (float(model.predict(p.features)),
      float(p.label)))

predictionAndLabel.collect()
# [(0.0, 0.0),
# (1.0, 0.0),
# (1.0, 0.0),
# . . .

# =====================================
# Convert to DF and form the Confusion Matrix
# =====================================
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

predDF = sqlContext.createDataFrame(predictionAndLabel.collect(),
                ["prediction", "label"])

predDF.groupBy("label", "prediction").count().show()
# The accuracy of the model
# +-----+----------+-----+
# |label|prediction|count|
# +-----+----------+-----+
# |  1.0|       1.0|   47|
# |  0.0|       1.0|   15|   <-- small data set
# |  1.0|       0.0|    3|
# |  0.0|       0.0|   35|
# +-----+----------+-----+

# =====================================
#          Save the Model
# model.save(sc,"TweetsSentimentModel")
# =====================================
import pickle

with open('tweetsSentiModel', 'wb') as f:
    pickle.dump(model, f)

# =====================================
#               Real Time
# Getting tweets in Real Time and making predictions
# =====================================
import pickle

from pyspark.mllib.classification import  NaiveBayesModel

with open('tweetsSentiModel', 'rb') as f:
    loadedModel = pickle.load(f)

# =====================================
# Read a Streaming context
# =====================================
from pyspark.streaming import StreamingContext

# 1 sec interval
streamContext = StreamingContext(sc, 1)

# stream the data from socket server (open socket on port 9000)
# run socket_server.py
tweets = streamContext.socketTextStream("localhost", 9000)

# =====================================
# Action - generate tweets and predict
# =====================================
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF

# Broadcast var
bc_model = sc.broadcast(loadedModel)

def predictSentiment(tweetText):
    ''' Predict sentiment for every tweet
    '''
    nbModel = bc_model.value

    # HashingTF transform
    hashingTF = HashingTF()
    tf = hashingTF.transform(tweetText)
    tf.cache()
    # IDF transform: Text -> TF-IDF Vector
    idf = IDF(minDocFreq=2).fit(tf)
    tfidf = idf.transform(tf)
    tfidf.cache()
    # Use model for prediction
    prediction = nbModel.predict(tfidf)

    print("Predictions for this window:", end = '')
    for i in range(0, prediction.count()):
        print(prediction.collect()[i], tweetText.collect()[i])

# Every time RDD is created the micro-batch is over ->
# call predictSentiment function ->
# make prediction for tweets
tweets.foreachRDD(predictSentiment)

# Run streaming
streamContext.start()
# Socket server:
# Publishing: The Da Vinci Code sucked
# Publishing: The Da Vinci Code was awesome...
# Publishing: da vinci code is awesome.
# Publishing: oh so beautiful Da Vinci Code...
# . . .

# Predictions for this window: 0.0 Client on 127.0.0.1The Da Vinci Code sucked
# Predictions for this window: 0.0 The Da Vinci Code was awesome...
# Predictions for this window: 0.0 da vinci code is awesome.
# . . .

streamContext.stop()